/*
 * Copyright (C) 2016 Piotr Wittchen
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.github.pwittchen.reactivenetwork.library.rx2.internet.observing.strategy;

import com.github.pwittchen.reactivenetwork.library.rx2.Preconditions;
import com.github.pwittchen.reactivenetwork.library.rx2.internet.observing.InternetObservingStrategy;
import com.github.pwittchen.reactivenetwork.library.rx2.internet.observing.error.ErrorHandler;
import com.github.pwittchen.reactivenetwork.library.rx2.utils.Log;
import com.jakewharton.nopen.annotation.Open;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleEmitter;
import io.reactivex.rxjava3.core.SingleOnSubscribe;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.concurrent.TimeUnit;

/**
 * Socket strategy for monitoring connectivity with the Internet.
 * It monitors Internet connectivity via opening socket connection with the remote host.
 */
@Open
public class SocketInternetObservingStrategy implements InternetObservingStrategy {
    private static final String TAG = SocketInternetObservingStrategy.class.getSimpleName();
    private static final String EMPTY_STRING = "";
    private static final String DEFAULT_HOST = "www.alibaba.com";
    private static final String HTTP_PROTOCOL = "http://";
    private static final String HTTPS_PROTOCOL = "https://";

    @Override
    public String getDefaultPingHost() {
        return DEFAULT_HOST;
    }

    @Override
    public Observable<Boolean> observeInternetConnectivity(
            final int initialIntervalInMs,
            final int intervalInMs,
            final String host,
            final int port,
            final int timeoutInMs,
            final int httpResponse,
            final ErrorHandler errorHandler) {
        Preconditions.checkGreaterOrEqualToZero(initialIntervalInMs, "initialIntervalInMs is not a positive number");
        Preconditions.checkGreaterThanZero(intervalInMs, "intervalInMs is not a positive number");
        checkGeneralPreconditions(host, port, timeoutInMs, errorHandler);

        final String adjustedHost = adjustHost(host);

        return Observable.interval(initialIntervalInMs, intervalInMs, TimeUnit.MILLISECONDS, Schedulers.io())
                .map(
                        new Function<Long, Boolean>() {
                            @Override
                            public Boolean apply(@NonNull Long tick) throws Exception {
                                return isConnected(adjustedHost, port, timeoutInMs, errorHandler);
                            }
                        })
                .distinctUntilChanged();
    }

    @Override
    public Single<Boolean> checkInternetConnectivity(
            final String host,
            final int port,
            final int timeoutInMs,
            final int httpResponse,
            final ErrorHandler errorHandler) {
        checkGeneralPreconditions(host, port, timeoutInMs, errorHandler);

        return Single.create(
                new SingleOnSubscribe<Boolean>() {
                    @Override
                    public void subscribe(@NonNull SingleEmitter<Boolean> emitter) throws Exception {
                        emitter.onSuccess(isConnected(host, port, timeoutInMs, errorHandler));
                    }
                });
    }

    /**
     * adjusts host to needs of SocketInternetObservingStrategy
     *
     * @param host original host
     * @return transformed host
     */
    protected String adjustHost(final String host) {
        if (host.startsWith(HTTP_PROTOCOL)) {
            return host.replace(HTTP_PROTOCOL, EMPTY_STRING);
        } else if (host.startsWith(HTTPS_PROTOCOL)) {
            return host.replace(HTTPS_PROTOCOL, EMPTY_STRING);
        }
        return host;
    }

    private void checkGeneralPreconditions(String host, int port, int timeoutInMs, ErrorHandler errorHandler) {
        Preconditions.checkNotNullOrEmpty(host, "host is null or empty");
        Preconditions.checkGreaterThanZero(port, "port is not a positive number");
        Preconditions.checkGreaterThanZero(timeoutInMs, "timeoutInMs is not a positive number");
        Preconditions.checkNotNull(errorHandler, "errorHandler is null");
    }

    /**
     * checks if device is connected to given host at given port
     *
     * @param host         to connect
     * @param port         to connect
     * @param timeoutInMs  connection timeout
     * @param errorHandler error handler for socket connection
     * @return true if connected and false if not
     * @throws IOException
     */
    protected boolean isConnected(
            final String host, final int port, final int timeoutInMs, final ErrorHandler errorHandler) throws IOException {
        SSLSocketFactory factory =
                (SSLSocketFactory) SSLSocketFactory.getDefault();
        SSLSocket socket =
                (SSLSocket) factory.createSocket(host, port);
        socket.setReuseAddress(true);
        return isConnected(socket, host, port, timeoutInMs, errorHandler);
    }

    /**
     * checks if device is connected to given host at given port
     *
     * @param socket       to connect
     * @param host         to connect
     * @param port         to connect
     * @param timeoutInMs  connection timeout
     * @param errorHandler error handler for socket connection
     * @return true if connected and false if not
     */
    protected boolean isConnected(
            final SSLSocket socket,
            final String host,
            final int port,
            final int timeoutInMs,
            final ErrorHandler errorHandler) {
        boolean isConnected = Boolean.FALSE;
        try {
            socket.startHandshake();

            PrintWriter out = new PrintWriter(
                    new BufferedWriter(
                            new OutputStreamWriter(
                                    socket.getOutputStream())));

            out.println("GET / HTTP/1.0");
            out.println();
            out.flush();

            /*
             * Make sure there were no surprises
             */
            if (out.checkError())
                Log.error(TAG, "SSLSocketClient:  java.io.PrintWriter error");

            /* read response */
            BufferedReader in = new BufferedReader(
                    new InputStreamReader(
                            socket.getInputStream()));

            while (in.readLine() != null)
                isConnected = Boolean.TRUE;

            in.close();
            out.close();
        } catch (IOException e) {
            isConnected = Boolean.FALSE;
        } finally {
            try {
                socket.close();
            } catch (IOException exception) {
                errorHandler.handleError(exception, "Could not close the socket");
            }
        }
        return isConnected;
    }
}
